主要包括: 令牌生成(processId) + 事件通知.
令牌生成:
基于AtomicLong.inc()机制,(纯内存机制,解决同机房,单节点同步需求,不需要多节点交互) 基于zookeeper的自增id机制,(解决异地机房,多节点协作同步需求) 事件通知: (简单原理: 每个stage都会有个block queue,接收上一个stage的single信号通知,当前stage会阻塞在该block queue上,直到有信号通知)
block queue + put/take方法,(纯内存机制) block queue + rpc + put/take方法 (两个stage对应的node不同,需要rpc调用,需要依赖负载均衡算法解决node节点的选择问题) block queue + zookeeper watcher ()
在processSelect()(SelectTask.java)函数里,可以看到这个ProcessId来自于EtlEventData,最后构成一个Identity放在rowBatch中的
final EtlEventData etlEventData = arbitrateEventService.selectEvent().await(pipelineId);
RowBatch rowBatch = new RowBatch();
// 构造唯一标识
Identity identity = new Identity();
identity.setChannelId(channel.getId());
identity.setPipelineId(pipelineId);
identity.setProcessId(etlEventData.getProcessId());
rowBatch.setIdentity(identity);
关键看这个ProcessId
MemoryStageController中生成ProcessId,这个atomicMaxProcessId是一个AtomicLong
private synchronized void initSelect() {
// 第一次/出现ROLLBACK/RESTART事件,删除了所有调度信号后,重新初始化一下select
// stage的数据,初始大小为并行度大小
// 后续的select的reply队列变化,由load single时直接添加
try{
ReplyProcessQueue queue = replys.get(StageType.SELECT);
int parallelism = ArbitrateConfigUtils.getParallelism(getPipelineId());
while (parallelism-- > 0 && queue.size() <= parallelism) {
queue.offer(atomicMaxProcessId.incrementAndGet());
}
} catch (ExecutionException e) {
e.printStackTrace();
}
}